22.3 多插件协作

19 分钟阅读

22.3.1 插件间通信机制#

事件总线通信#

// src/communication/event-bus.ts

/**

  • 插件事件总线 */ export class PluginEventBus { private listeners: Map<string, EventListener[]> = new Map(); private history: Event[] = []; private maxHistorySize: number = 100;

constructor(maxHistorySize: number = 100) { this.maxHistorySize = maxHistorySize; }

/**

  • 订阅事件 */ subscribe(eventType: string, listener: EventListener): void { if (!this.listeners.has(eventType)) { this.listeners.set(eventType, []); }

this.listeners.get(eventType)!.push(listener); }

/**

  • 取消订阅 */ unsubscribe(eventType: string, listener: EventListener): void { const listeners = this.listeners.get(eventType);

if (listeners) { const index = listeners.indexOf(listener); if (index > -1) { listeners.splice(index, 1); } } }

/**

  • 发布事件 */ async publish(event: PluginEvent): Promise<void> { // 记录历史 this.addToHistory(event);

// 获取监听器 const listeners = this.listeners.get(event.type);

if (!listeners || listeners.length === 0) { return; }

// 通知所有监听器 const promises = listeners.map(listener => this.safeNotify(listener, event) );

await Promise.all(promises);

}

/**

  • 安全通知 */ private async safeNotify(listener: EventListener, event: PluginEvent): Promise<void> { try { await listener(event); } catch (error) { console.error(Error in event listener for ${event.type}:, error); } }

/**

  • 添加到历史 */ private addToHistory(event: PluginEvent): void { this.history.push(event);

// 限制历史大小 if (this.history.length > this.maxHistorySize) { this.history.shift(); } }

/**

  • 获取历史事件 */ getHistory(eventType?: string): PluginEvent[] { if (eventType) { return this.history.filter(event => event.type === eventType); }

return [...this.history]; }

/**

  • 清除历史 */ clearHistory(): void { this.history = []; } }

/**

  • 事件监听器 */ type EventListener = (event: PluginEvent) => Promise<void> | void;

/**

  • 插件事件 */ interface PluginEvent { type: string; source: string; target?: string; data: any; timestamp: Date; }

// 使用示例 const eventBus = new PluginEventBus();

// 插件 A 订阅事件 eventBus.subscribe('user.created', async (event) => { console.log(Plugin A received user.created:, event.data); });

// 插件 B 订阅事件 eventBus.subscribe('user.created', async (event) => { console.log(Plugin B received user.created:, event.data); });

// 插件 C 发布事件 await eventBus.publish({ type: 'user.created', source: 'plugin-c', data: { userId: 1, name: 'John' }, timestamp: new Date() });

// 查看历史 const history = eventBus.getHistory('user.created'); console.log('History:', history);

消息队列通信#

bash
typescript

// src/communication/message-queue.ts

/**
 * 插件消息队列
 */
export class PluginMessageQueue {
  private queues: Map<string, Message[]> = new Map();
  private consumers: Map<string, MessageConsumer[]> = new Map();
  private processing: boolean = false;

  /**
   * 发送消息
   */
  send(queueName: string, message: Message): void {
    if (!this.queues.has(queueName)) {
      this.queues.set(queueName, []);
    }

    this.queues.get(queueName)!.push(message);

    // 触发处理
    this.processQueue(queueName);
  }

  /**
   * 接收消息
   */
  receive(queueName: string, consumer: MessageConsumer): void {
    if (!this.consumers.has(queueName)) {
      this.consumers.set(queueName, []);
    }

    this.consumers.get(queueName)!.push(consumer);
  }

  /**
   * 处理队列
   */
  private async processQueue(queueName: string): Promise<void> {
    if (this.processing) {
      return;
    }

    this.processing = true;

    try {
      const queue = this.queues.get(queueName);
      const consumers = this.consumers.get(queueName);

      if (!queue || queue.length === 0 || !consumers || consumers.length === 0) {
        return;
      }

      // 取出消息
      const message = queue.shift()!;

      // 分发给消费者
      const promises = consumers.map(consumer =>
        this.safeConsume(consumer, message)
      );

      await Promise.all(promises);

      // 继续处理下一条消息
      if (queue.length > 0) {
        await this.processQueue(queueName);
      }
    } finally {
      this.processing = false;
    }
  }

  /**
   * 安全消费
   */
  private async safeConsume(consumer: MessageConsumer, message: Message): Promise<void> {
    try {
      await consumer(message);
    } catch (error) {
      console.error(`Error in message consumer:`, error);
    }
  }

  /**
   * 获取队列大小
   */
  getQueueSize(queueName: string): number {
    const queue = this.queues.get(queueName);
    return queue ? queue.length : 0;
  }

  /**
   * 清空队列
   */
  clearQueue(queueName: string): void {
    this.queues.set(queueName, []);
  }
}

/**
 * 消息接口
 */
interface Message {
  id: string;
  source: string;
  target?: string;
  data: any;
  timestamp: Date;
}

/**
 * 消息消费者
 */
type MessageConsumer = (message: Message) => Promise<void> | void;

// 使用示例
const messageQueue = new PluginMessageQueue();

// 插件 A 注册消费者
messageQueue.receive('user-queue', async (message) => {
  console.log(`Plugin A received message:`, message.data);
});

// 插件 B 注册消费者
messageQueue.receive('user-queue', async (message) => {
  console.log(`Plugin B received message:`, message.data);
});

// 插件 C 发送消息
messageQueue.send('user-queue', {
  id: 'msg-1',
  source: 'plugin-c',
  data: { userId: 1, name: 'John' },
  timestamp: new Date()
});

// 查看队列大小
const size = messageQueue.getQueueSize('user-queue');
console.log('Queue size:', size);

### RPC 通信

// src/communication/rpc.ts
/**
* 插件 RPC 服务
*/
export class PluginRPCService {
private services: Map<string, RPCHandler> = new Map();
private clients: Map<string, RPCClient> = new Map();
/**
* 注册服务
*/
registerService(serviceName: string, handler: RPCHandler): void {
this.services.set(serviceName, handler);
}
/**
* 注销服务
*/
unregisterService(serviceName: string): void {
this.services.delete(serviceName);
}
/**
* 调用服务
*/
async call(serviceName: string, method: string, params: any): Promise<any> {
const handler = this.services.get(serviceName);
if (!handler) {
throw new Error(`Service not found: ${serviceName}`);
}
return handler(method, params);
}
/**
* 创建客户端
*/
createClient(serviceName: string): RPCClient {
const client = new RPCClient(this, serviceName);
this.clients.set(serviceName, client);
return client;
}
/**
* 获取客户端
*/
getClient(serviceName: string): RPCClient | undefined {
return this.clients.get(serviceName);
}
}
/**
* RPC 处理器
*/
type RPCHandler = (method: string, params: any) => Promise<any>;
/**
* RPC 客户端
*/
export class RPCClient {
constructor(
private rpcService: PluginRPCService,
private serviceName: string
) {}
/**
* 调用方法
*/
async call(method: string, params?: any): Promise<any> {
return this.rpcService.call(this.serviceName, method, params || {});
}
}
// 使用示例
const rpcService = new PluginRPCService();
// 插件 A 注册服务
rpcService.registerService('user-service', async (method, params) => {
switch (method) {
case 'getUser':
return { id: params.id, name: 'John Doe' };
case 'createUser':
return { id: Date.now(), ...params };
default:
throw new Error(`Unknown method: ${method}`);
}
});
// 插件 B 注册服务
rpcService.registerService('order-service', async (method, params) => {
switch (method) {
case 'getOrder':
return { id: params.id, userId: 1, total: 100 };
case 'createOrder':
return { id: Date.now(), ...params };
default:
throw new Error(`Unknown method: ${method}`);
}
});
// 插件 C 创建客户端并调用服务
const userClient = rpcService.createClient('user-service');
const orderClient = rpcService.createClient('order-service');
const user = await userClient.call('getUser', { id: 1 });
console.log('User:', user);
const order = await orderClient.call('getOrder', { id: 1 });
console.log('Order:', order);

22.3.2 插件依赖管理#

依赖解析#

bash
typescript

// src/dependencies/dependency-resolver.ts

/**
 * 插件依赖解析器
 */
export class PluginDependencyResolver {
  private plugins: Map<string, PluginInfo> = new Map();

  /**
   * 添加插件
   */
  addPlugin(pluginInfo: PluginInfo): void {
    this.plugins.set(pluginInfo.name, pluginInfo);
  }

  /**
   * 移除插件
   */
  removePlugin(pluginName: string): void {
    this.plugins.delete(pluginName);
  }

  /**
   * 解析依赖顺序
   */
  resolve(): string[] {
    const visited: Set<string> = new Set();
    const visiting: Set<string> = new Set();
    const order: string[] = [];

    for (const pluginName of this.plugins.keys()) {
      if (!visited.has(pluginName)) {
        this.visit(pluginName, visited, visiting, order);
      }
    }

    return order;
  }

  /**
   * 访问插件
   */
  private visit(
    pluginName: string,
    visited: Set<string>,
    visiting: Set<string>,
    order: string[]
  ): void {
    if (visiting.has(pluginName)) {
      throw new Error(`Circular dependency detected: ${pluginName}`);
    }

    if (visited.has(pluginName)) {
      return;
    }

    visiting.add(pluginName);

    const plugin = this.plugins.get(pluginName);
    if (plugin) {
      for (const dep of plugin.dependencies || []) {
        this.visit(dep, visited, visiting, order);
      }
    }

    visiting.delete(pluginName);
    visited.add(pluginName);
    order.push(pluginName);
  }

  /**
   * 检查依赖
   */
  checkDependencies(): DependencyCheckResult {
    const errors: string[] = [];
    const warnings: string[] = [];

    for (const [name, plugin] of this.plugins.entries()) {
      for (const dep of plugin.dependencies || []) {
        if (!this.plugins.has(dep)) {
          errors.push(`Plugin ${name} depends on missing plugin: ${dep}`);
        }
      }
    }

    return { errors, warnings };
  }
}

/**
 * 插件信息
 */
interface PluginInfo {
  name: string;
  version: string;
  dependencies?: string[];
}

/**
 * 依赖检查结果
 */
interface DependencyCheckResult {
  errors: string[];
  warnings: string[];
}

// 使用示例
const resolver = new PluginDependencyResolver();

// 添加插件
resolver.addPlugin({
  name: 'plugin-a',
  version: '1.0.0',
  dependencies: []
});

resolver.addPlugin({
  name: 'plugin-b',
  version: '1.0.0',
  dependencies: ['plugin-a']
});

resolver.addPlugin({
  name: 'plugin-c',
  version: '1.0.0',
  dependencies: ['plugin-a', 'plugin-b']
});

// 检查依赖
const checkResult = resolver.checkDependencies();
console.log('Dependency check:', checkResult);

// 解析依赖顺序
const order = resolver.resolve();
console.log('Load order:', order);
// ['plugin-a', 'plugin-b', 'plugin-c']

### 依赖注入

// src/dependencies/dependency-injection.ts
/**
* 插件依赖注入容器
*/
export class PluginDIContainer {
private services: Map<string, ServiceDefinition> = new Map();
private instances: Map<string, any> = new Map();
/**
* 注册服务
*/
registerService(name: string, definition: ServiceDefinition): void {
this.services.set(name, definition);
}
/**
* 解析服务
*/
resolve(name: string): any {
// 检查是否已实例化
if (this.instances.has(name)) {
return this.instances.get(name);
}
// 获取服务定义
const definition = this.services.get(name);
if (!definition) {
throw new Error(`Service not found: ${name}`);
}
// 解析依赖
const dependencies = (definition.dependencies || []).map(dep =>
this.resolve(dep)
);
// 创建实例
const instance = definition.factory(...dependencies);
// 如果是单例,缓存实例
if (definition.singleton) {
this.instances.set(name, instance);
}
return instance;
}
/**
* 清除实例
*/
clear(): void {
this.instances.clear();
}
}
/**
* 服务定义
*/
interface ServiceDefinition {
factory: (...args: any[]) => any;
dependencies?: string[];
singleton?: boolean;
}
// 使用示例
const container = new PluginDIContainer();
// 注册服务
container.registerService('logger', {
factory: () => ({
log: (message: string) => console.log(`[LOG] ${message}`)
}),
singleton: true
});
container.registerService('database', {
factory: (logger: any) => ({
query: (sql: string) => {
logger.log(`Executing query: ${sql}`);
return [];
}
}),
dependencies: ['logger'],
singleton: true
});
container.registerService('userService', {
factory: (database: any, logger: any) => ({
getUser: (id: number) => {
logger.log(`Getting user ${id}`);
return database.query(`SELECT * FROM users WHERE id = ${id}`);
}
}),
dependencies: ['database', 'logger'],
singleton: false
});
// 解析服务
const userService = container.resolve('userService');
const user = userService.getUser(1);
console.log('User:', user);

22.3.3 插件生命周期协调#

生命周期管理器#

bash
typescript

// src/lifecycle/lifecycle-manager.ts

/**
 * 插件生命周期管理器
 */
export class PluginLifecycleManager {
  private plugins: Map<string, ManagedPlugin> = new Map();
  private state: LifecycleState = LifecycleState.IDLE;

  /**
   * 添加插件
   */
  addPlugin(plugin: ManagedPlugin): void {
    this.plugins.set(plugin.name, plugin);
  }

  /**
   * 移除插件
   */
  removePlugin(pluginName: string): void {
    this.plugins.delete(pluginName);
  }

  /**
   * 初始化所有插件
   */
  async initializeAll(): Promise<void> {
    this.state = LifecycleState.INITIALIZING;

    const resolver = new PluginDependencyResolver();

    // 添加插件到解析器
    for (const plugin of this.plugins.values()) {
      resolver.addPlugin({
        name: plugin.name,
        version: plugin.version,
        dependencies: plugin.dependencies
      });
    }

    // 解析依赖顺序
    const order = resolver.resolve();

    // 按顺序初始化
    for (const pluginName of order) {
      const plugin = this.plugins.get(pluginName);
      if (plugin) {
        await plugin.initialize();
      }
    }

    this.state = LifecycleState.INITIALIZED;
  }

  /**
   * 启动所有插件
   */
  async startAll(): Promise<void> {
    this.state = LifecycleState.STARTING;

    const resolver = new PluginDependencyResolver();

    // 添加插件到解析器
    for (const plugin of this.plugins.values()) {
      resolver.addPlugin({
        name: plugin.name,
        version: plugin.version,
        dependencies: plugin.dependencies
      });
    }

    // 解析依赖顺序
    const order = resolver.resolve();

    // 按顺序启动
    for (const pluginName of order) {
      const plugin = this.plugins.get(pluginName);
      if (plugin) {
        await plugin.start();
      }
    }

    this.state = LifecycleState.RUNNING;
  }

  /**
   * 停止所有插件
   */
  async stopAll(): Promise<void> {
    this.state = LifecycleState.STOPPING;

    const resolver = new PluginDependencyResolver();

    // 添加插件到解析器
    for (const plugin of this.plugins.values()) {
      resolver.addPlugin({
        name: plugin.name,
        version: plugin.version,
        dependencies: plugin.dependencies
      });
    }

    // 解析依赖顺序(反向)
    const order = resolver.resolve().reverse();

    // 按反向顺序停止
    for (const pluginName of order) {
      const plugin = this.plugins.get(pluginName);
      if (plugin) {
        await plugin.stop();
      }
    }

    this.state = LifecycleState.STOPPED;
  }

  /**
   * 清理所有插件
   */
  async cleanupAll(): Promise<void> {
    const resolver = new PluginDependencyResolver();

    // 添加插件到解析器
    for (const plugin of this.plugins.values()) {
      resolver.addPlugin({
        name: plugin.name,
        version: plugin.version,
        dependencies: plugin.dependencies
      });
    }

    // 解析依赖顺序(反向)
    const order = resolver.resolve().reverse();

    // 按反向顺序清理
    for (const pluginName of order) {
      const plugin = this.plugins.get(pluginName);
      if (plugin) {
        await plugin.cleanup();
      }
    }

    this.state = LifecycleState.IDLE;
  }

  /**
   * 获取状态
   */
  getState(): LifecycleState {
    return this.state;
  }
}

/**
 * 管理的插件
 */
interface ManagedPlugin {
  name: string;
  version: string;
  dependencies?: string[];
  initialize: () => Promise<void>;
  start: () => Promise<void>;
  stop: () => Promise<void>;
  cleanup: () => Promise<void>;
}

/**
 * 生命周期状态
 */
enum LifecycleState {
  IDLE = 'IDLE',
  INITIALIZING = 'INITIALIZING',
  INITIALIZED = 'INITIALIZED',
  STARTING = 'STARTING',
  RUNNING = 'RUNNING',
  STOPPING = 'STOPPING',
  STOPPED = 'STOPPED'
}

// 使用示例
const manager = new PluginLifecycleManager();

// 添加插件
manager.addPlugin({
  name: 'plugin-a',
  version: '1.0.0',
  dependencies: [],
  initialize: async () => console.log('Plugin A initialized'),
  start: async () => console.log('Plugin A started'),
  stop: async () => console.log('Plugin A stopped'),
  cleanup: async () => console.log('Plugin A cleaned up')
});

manager.addPlugin({
  name: 'plugin-b',
  version: '1.0.0',
  dependencies: ['plugin-a'],
  initialize: async () => console.log('Plugin B initialized'),
  start: async () => console.log('Plugin B started'),
  stop: async () => console.log('Plugin B stopped'),
  cleanup: async () => console.log('Plugin B cleaned up')
});

manager.addPlugin({
  name: 'plugin-c',
  version: '1.0.0',
  dependencies: ['plugin-a', 'plugin-b'],
  initialize: async () => console.log('Plugin C initialized'),
  start: async () => console.log('Plugin C started'),
  stop: async () => console.log('Plugin C stopped'),
  cleanup: async () => console.log('Plugin C cleaned up')
});

// 初始化
await manager.initializeAll();

// 启动
await manager.startAll();

// 停止
await manager.stopAll();

// 清理
await manager.cleanupAll();

### 生命周期事件

// src/lifecycle/lifecycle-events.ts
/**
* 插件生命周期事件
*/
export class PluginLifecycleEvents {
private listeners: Map<string, LifecycleEventListener[]> = new Map();
/**
* 订阅事件
*/
subscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void {
if (!this.listeners.has(eventType)) {
this.listeners.set(eventType, []);
}
this.listeners.get(eventType)!.push(listener);
}
/**
* 取消订阅
*/
unsubscribe(eventType: LifecycleEventType, listener: LifecycleEventListener): void {
const listeners = this.listeners.get(eventType);
if (listeners) {
const index = listeners.indexOf(listener);
if (index > -1) {
listeners.splice(index, 1);
}
}
}
/**
* 触发事件
*/
async emit(event: LifecycleEvent): Promise<void> {
const listeners = this.listeners.get(event.type);
if (!listeners || listeners.length === 0) {
return;
}
const promises = listeners.map(listener =>
this.safeNotify(listener, event)
);
await Promise.all(promises);
}
/**
* 安全通知
*/
private async safeNotify(listener: LifecycleEventListener, event: LifecycleEvent): Promise<void> {
try {
await listener(event);
} catch (error) {
console.error(`Error in lifecycle event listener:`, error);
}
}
}
/**
* 生命周期事件类型
*/
enum LifecycleEventType {
BEFORE_INITIALIZE = 'BEFORE_INITIALIZE',
AFTER_INITIALIZE = 'AFTER_INITIALIZE',
BEFORE_START = 'BEFORE_START',
AFTER_START = 'AFTER_START',
BEFORE_STOP = 'BEFORE_STOP',
AFTER_STOP = 'AFTER_STOP',
BEFORE_CLEANUP = 'BEFORE_CLEANUP',
AFTER_CLEANUP = 'AFTER_CLEANUP'
}
/**
* 生命周期事件
*/
interface LifecycleEvent {
type: LifecycleEventType;
pluginName: string;
timestamp: Date;
}
/**
* 生命周期事件监听器
*/
type LifecycleEventListener = (event: LifecycleEvent) => Promise<void> | void;
// 使用示例
const events = new PluginLifecycleEvents();
// 订阅事件
events.subscribe(LifecycleEventType.BEFORE_START, async (event) => {
console.log(`Before starting plugin: ${event.pluginName}`);
});
events.subscribe(LifecycleEventType.AFTER_START, async (event) => {
console.log(`After starting plugin: ${event.pluginName}`);
});
// 触发事件
await events.emit({
type: LifecycleEventType.BEFORE_START,
pluginName: 'plugin-a',
timestamp: new Date()
});
await events.emit({
type: LifecycleEventType.AFTER_START,
pluginName: 'plugin-a',
timestamp: new Date()
});

标记本节教程为已读

记录您的学习进度,方便后续查看。